package com.dahuan.state;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;



public class State_KeyedApplicationCaseState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //TODO 基于事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );
        //水印之间的间隔（以毫秒为单位）
        env.getConfig().setAutoWatermarkInterval( 100 );

        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );

        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        // TODO 定义一个FlapFunction,检测温度跳变,输出报警
        SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy( "id" )
                .flatMap( new TempChangeWarning( 10.0 ) );

        resultStream.print();
        env.execute("State_KeyedApplicationCaseState");

    }

//    public static class JDBCMysql extends RichSinkFunction<SensorReading>{
//        Connection conn = null;
//        PreparedStatement p
//    }




    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
        //私有属性,温度跳变预警
        private Double threshold;

        public TempChangeWarning(Double threshold){
            this.threshold = threshold;
        }

        //定义一个状态
        private ValueState<Double> lastTempState;

        @Override
        public void open(Configuration parameters) throws Exception {
            //状态连接
            lastTempState = getRuntimeContext().getState( new ValueStateDescriptor<Double>("lastTempState",Double.class) );
        }

        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {

            //TODO 获取状态的值
            Double lastTemp = lastTempState.value();

            //TODO 如果状态不为空，那么就判断两次温度差值
            if (lastTemp != null){
                //TODO 两个温度值比较差值
                double diff = Math.abs( value.getTemperature() - lastTemp );
                //
                if (diff >= threshold ){
                    out.collect(new Tuple3<>(value.getId(),lastTemp,value.getTemperature()));
                }
            }

            //更新状态
            lastTempState.update( value.getTemperature() );
        }

        //TODO 清除状态
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }

}


